Amazon DynamoDB zero-ETL integration with Amazon OpenSearch Service を試してみた
CDKでやってみた版はこちらをご参照ください。
はじめに
DynamoDBのクエリ柔軟性を補うために、OpenSearch Serviceの検索機能を利用するパターンは知られていましたが、以前はそのアーキテクチャのためにLambdaなどを用意する必要がありました。(参考1, 参考2)
参考1:
参考2:
今回のアップデートにより、Lambdaなどのグルーコードを用いずに、DynamoDBのテーブルをOpenSearch Serviceに同期することが可能になりました。
まずはDynamoDBを準備
以下のブログにあるCDKでDynamoDBを用意します。
やってみる
Ingestion Pipelineのコンソールを開く
DynamoDBのコンソールのサイドメニューに「Integrations」が増えているので、
Integrations
> 任意のテーブル > Create
の順でクリックしていきます。
以下のような画面が出てくるので、まずはロールなど、必要なリソースを作成します。 画面の手順に書かれている内容を確認してみましょう。
1: DynamoDBテーブルを選択する
OpenSearchのindexにレプリケートするDynamoDBテーブルを作成または選択します。PITR(ポイントインタイムリカバリ)とDynamoDBストリームは、あらかじめ有効にしておく必要があります。
こちらはすでに作成されています。また用意に用いたCDKの中でPITRとStreamの有効化も完了しています。
2: パーミッションの設定
pipelineがテーブルとそのstreamにアクセスするためのIAMロールと、テーブルのフルエクスポートに使用するAmazon S3バケットを作成します。
以下のドキュメントに従って、IAMロールを作成します。
参考3:
参考4:
3: OpenSearchの保存先を選択する
OpenSearchの保存先を選択します。デフォルトでは、テーブルをもとに名付けられたインデックスに、OpenSearchはDynamoDBのアイテムを動的にマッピングします。
この手順では、表示されているIngestion Pipelineのコンソールに対して入力していきます。 名前などは適当に入力していきますが、yamlを入力する箇所があるので、これについては以下のように入力する必要があります。
version: "2" dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: # 必須: 用意したDynamoDBテーブルのARNをします。おそらく既に入力されているので、そのままでOKです。 - table_arn: <<dynamodbのARNを入力します>> # DynamoDB Streamsを用いてニアリアルタイムでデータを統合する場合は、以下のように設定します。 # 不要である場合は行ごと削除してください。 stream: start_position: "LATEST" # 既存データをまるごと統合する場合には、以下のように設定します。 # 不要である場合は行ごと削除してください。 export: # 必須: DynamoDBのデータをエクスポートするs3 bucketを指定します。入力する内容はbucket名です。(ARNではありません) s3_bucket: "<<my-bucket>>" # 必須:バケットのリージョンを指定します。 s3_region: "<<us-east-1>>" # 任意: DynamoDBからS3バケットにデータをエクスポートする際のプレフィックスを指定します。 s3_prefix: "ddb-to-opensearch-export/" aws: # 必須: 上記の手順2で作成したIAMロールのARNを指定します。 sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>" # pipelineこのロールを使用する際のクレデンシャルのregionを指定します。pipelineのリージョンと揃えておけば大丈夫です。 region: "ap-northeast-1" sink: - opensearch: # 必須: AWS OpenSearch endpointを指定します。 hosts: [ `https`から始まるAWS OpenSearch endpointを指定します。 OpenSearchドメインのwebコンソールで手に入ります。 ] index: "table-index" index_type: custom document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external" aws: # 必須: 上記の手順2で作成したIAMロールのARNを指定します。 sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>" # OpenSearchドメインのリージョンを指定します。 region: "<<us-east-1>>" # 以下の機能は筆者がまだ試せていません。追ってブログ化したいと考えています。。。 # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection # serverless: true # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the S3 DLQ to capture any failed requests in an S3 bucket. This is recommended as a best practice for all pipelines. # dlq: # s3: # Provide an S3 bucket # bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "dynamodb-pipeline/dlq" # Provide the region of the bucket. # region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com # sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
以上を設定して、Next
> Create Pipeline
の順でクリックします。
動作確認してみる
pipelineがActiveになっても、すぐにはデータ連携されないようです。 体感ですが10数分ほど待つと、データが連携が完了しました。データが多いほど時間がかかると思います。
既存データを確認する
まずは既存のデータが入力されていることを確認してみます。 OpenSearchのデータの確認にはOpenSearch Cliを利用します。 OpenSearch Dashboardが整備できている場合はそちらで確認するのがスムーズだと思います。
> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty { "_index" : "table-index-1", "_id" : "205", "_version" : 1701916967207000, "_seq_no" : 2, "_primary_term" : 1, "found" : true, "_source" : { "Id" : 205, "Title" : "18-Bike-204", "Price" : 500.0, "Brand" : "Brand-Company C", "Description" : "205 Description", "Color" : [ "Red", "Black" ], "ProductCategory" : "Bicycle", "BicycleType" : "Hybrid" } }
データが確認できました!
既存データを削除してみる
DynamoDBのデータの操作にはdyneinを利用します。
> dy del -t ProductCatalog 205 Successfully deleted an item from the table 'ProductCatalog'. > opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty { "_id": "205", "_index": "table-index-2", "found": false }
データが削除されていることが確認できました! この連携は数秒で完了しているようです。連携時間にどれほどのばらつきがあるかは未検証です。
新規データを追加してみる
先程削除したデータを再投入してみます。
> dy put -t ProductCatalog 205 -i '{ "Brand": "Brand-Company C", "ProductCategory": "Bicycle", "Description": "205 Description", "Price": 500, "Id": 205, "BicycleType": "Hybrid", "Color": [ "Red", "Black" ], "Title": "18-Bike-204" }' Successfully put an item to the table 'ProductCatalog'. > opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty { "_index" : "table-index-2", "_id" : "205", "_version" : 1701919401000000, "_seq_no" : 4, "_primary_term" : 1, "found" : true, "_source" : { "Brand" : "Brand-Company C", "Description" : "205 Description", "Price" : 500, "Color" : [ "Black", "Red" ], "ProductCategory" : "Bicycle", "Title" : "18-Bike-204", "Id" : 205, "BicycleType" : "Hybrid" } }
データが追加されていることが確認できました!
まとめ
以上の通り、既存データも新規操作もOpenSearchに連携されることが確認できました。
OpenSearchはとても多機能な検索エンジンなので、この機にいろいろ触ってみてはいかがでしょうか。(筆者はいろいろ勉強が必要であると痛感しています。。。)
以上でした!
余談: 設定したyamlは何者?
今回の設定に使用したyamlの仕様はどこに書いてあるのかというと、OSSとしてのOpenSearchのドキュメントに書いてありました。
OSSのOpenSearchの機能の中ではData Prepperと呼ばれる機能で、DynamoDB連携を実現する前から様々なデータソースからOpenSearchへとデータを統合する機能として存在していたようです。
yamlの書き方について不明な点がある場合は、上記のドキュメントを参照すると良いと思います。